package com.skyline.courier.mq.provider.rabbitmq;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * RabbitMQ的Channel工厂，用于创建{@link Channel}对象
 * 
 * @author wuqh
 * 
 */
public class RabbitChannelFactory implements ShutdownListener {

	public static final int DEFAULT_CLOSE_CODE = AMQP.REPLY_SUCCESS;
	public static final String DEFAULT_CLOSE_MESSAGE = "Goodbye";

	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitChannelFactory.class);

	private RabbitConnectionFactory connectionFactory;
	private int closeCode = DEFAULT_CLOSE_CODE;
	private String closeMessage = DEFAULT_CLOSE_MESSAGE;

	private final Set<Reference<Channel>> channelReferenceSet = new HashSet<Reference<Channel>>();

	public void setConnectionFactory(RabbitConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public void setCloseCode(int closeCode) {
		this.closeCode = closeCode;
	}

	public void setCloseMessage(String closeMessage) {
		this.closeMessage = closeMessage;
	}

	public Channel createChannel() throws IOException {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug("开始创建Channel对象");
		}

		Connection connection = connectionFactory.getConnection();
		connection.addShutdownListener(this);
		Channel channel = connection.createChannel();
		channelReferenceSet.add(new WeakReference<Channel>(channel));

		if (LOGGER.isInfoEnabled()) {
			LOGGER.info("创建Channel成功，Channel个数[" + channel.getChannelNumber() + "]");
		}
		return channel;
	}

	public void destroy() {
		closeChannels();
		connectionFactory.destroy();
	}

	private void closeChannels() {
		if (LOGGER.isInfoEnabled()) {
			LOGGER.info("即将关闭系统当前打开的[" + channelReferenceSet.size() + "]个Channel");
		}

		for (Reference<Channel> channelReference : channelReferenceSet) {

			try {
				Channel channel = channelReference.get();
				if (channel != null && channel.isOpen()) {
					if (channel.getConnection().isOpen()) {
						channel.close(closeCode, closeMessage);
					}
				}
			} catch (NullPointerException e) {
				LOGGER.error("关闭Channel失败", e);
			} catch (IOException e) {
				LOGGER.error("关闭Channel失败", e);
			}
		}
		if (LOGGER.isInfoEnabled()) {
			LOGGER.info("所有Channel已关闭");
		}

		channelReferenceSet.clear();

	}

	@Override
	public void shutdownCompleted(ShutdownSignalException cause) {
		if (cause.isInitiatedByApplication()) {
			if (LOGGER.isInfoEnabled()) {
				LOGGER.info("系统被应用关闭，reference [" + cause.getReference() + "]，原因 [" + cause.getReason() + "]");
			}

		} else if (cause.isHardError()) {
			LOGGER.error("系统由于硬件问题关闭，reference [" + cause.getReference() + "]，原因 [" + cause.getReason() + "]");
		}
		if (LOGGER.isInfoEnabled()) {
			LOGGER.info("关闭系统完成");
		}
	}

	public RabbitConnectionFactory getConnectionFactory() {
		return connectionFactory;
	}
}
